package org.ykx.demo

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.Row

object SparkTest {

   val conf = new SparkConf().setMaster ("local[*]").setAppName ("SparkTest")
   val checkpointDirectory = "hdfs://10.10.61.192:8020//tmp/checkpoint"  
   val sc = new SparkContext (conf)
       sc.setCheckpointDir(checkpointDirectory)  
   val sqlContext = new SQLContext(sc)
   val hiveContext = new HiveContext(sc)
   
   
  def main(args: Array[String]): Unit = {
    val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
    val pairs1 = sc.parallelize(data1, 3)
    
    val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
    val pairs2 = sc.parallelize(data2, 2)
    
    pairs2.persist(StorageLevel.DISK_ONLY)
    pairs2.checkpoint()
    println("[RDD Count]： "+pairs2.count())
    
//    val result = pairs1.join(pairs2)
//    result.checkpoint()
    
    case class Purchase(customer_id: Int, purchase_id: Int, date: String, time: String, tz: String, amount:Double)
    val schema = StructType(StructField("customer_id", IntegerType) ::
                                StructField("purchase_id", IntegerType) ::
                                StructField("date", StringType) ::
                                StructField("time", StringType) ::
                                StructField("tz", StringType) ::
                                StructField("amount", DoubleType) :: Nil)

    //import sqlContext.implicits._
    val rdd = sc.parallelize(Array(
      Row(123, 234, "2007-12-12", "20:50", "UTC", 500.99),
      Row(123, 247, "2007-12-12", "15:30", "PST", 300.22),
      Row(189, 254, "2007-12-13", "00:50", "EST", 122.19),
      Row(187, 299, "2007-12-12", "07:30", "UTC", 524.37)))

    val df = sqlContext.createDataFrame(rdd, schema)
    df.registerTempTable("tmp_df")
    
    
    
  }
}